Chaining Datasets

Warning

Dependencies are still under development, the current implementation has been added owing to its usefulness, though expect changes in subsequent releases. Every effort will be made to ensure the stability of current methods, however you should always check release notes before upgrading versions if you have an important workflow running with dependencies.

Imagine a scenario where you want to run a heavy calculation which produces a very large output. If you wanted to postprocess those results, you could collect them with fetch_results and process them locally. This, however, could take time to transfer, and uses up disk space locally. Datasets have a better way of doing this, called “dependencies”.

Dependencies

Dependencies allow you to chain jobs together on the remote machine, submitting with different parameters. For example, we can submit a calculation on 50 nodes, and then postprocess the result using only one cpu core.

Currently, dependencies are limited to a linear chain, and only on a one-to-one basis. This means jobs can be joined together in an A -> B -> C sense, and their runners will have a continuous line between them. While expansion is planned, lets look into the current implentation and how it can help our hypothetical scenario.

Lets begin by defining three functions this time:

[2]:
from remotemanager import Dataset, URL

def init(offset):
    return offset

def mult(x, y):
    offset = loaded
    return offset + (x * y)

def post():
    return f'The final result is {loaded}'

This workflow does three things:

  1. An “offset” is specified. Think of it as the c in a y = mx + c equation.

  2. Two numbers are multiplied together and added to the offset (your mx).

  3. The result is formatted into a string and returned.

An aside on loaded

loaded is a property that is added by the dependency network, and it simply allows a function to access the returned value of the function immediately before it in the chain.

Think of it as exactly what is given by the return of a function:

[3]:
def multi_return():
    return 1, 2, 3

def multi_process():
    a, b, c = loaded

    return a + b + c

The output of the above functions would always be 6.

Creating your chained runners

Going back to our original workflow, lets create the datasets. This is done as normal:

[4]:
url = URL()  # again, a "local" url for testing purposes

dataset_init = Dataset(function = init,
                       name = 'init',
                       url = url,
                       remote_dir = 'temp_remote',
                       local_dir = 'temp_local',
                       mpi = 1,
                       omp = 1,
                       nodes = 1,
                       skip = False)

dataset_calc = Dataset(function = mult,
                       name = 'calc',
                       url = url,
                       remote_dir = 'temp_remote',
                       local_dir = 'temp_local',
                       mpi = 64,
                       omp = 4,
                       nodes = 50,
                       skip = False)

dataset_post = Dataset(function = post,
                       name = 'post',
                       url = url,
                       remote_dir = 'temp_remote',
                       local_dir = 'temp_local',
                       mpi = 1,
                       omp = 1,
                       nodes = 1,
                       skip = False)

dataset_init.set_downstream(dataset_calc)  # new option!
dataset_calc.set_downstream(dataset_post)

The new option here to pay attention to is set_downstream, though there is also the mirror of this in set_upstream. These dictate the order in which the datasets will be run.

Doing this creates a global Dependency object that can be accessed from any of the Datasets:

[5]:
dataset_post.dependency.network
[5]:
[(dataset-init-40221af0, dataset-calc-dce4a953),
 (dataset-calc-dce4a953, dataset-post-794d5d4f)]
[6]:
dataset_init.dependency is dataset_calc.dependency is dataset_post.dependency
[6]:
True

Here we can see two “edges” to this network, first a connection between init and calc, then a second one between calc and init

There also exists some extra methods to check whether a dataset has parents or children

[7]:
print('init:')
print('Is parent?', dataset_init.is_parent)
print('Is child?', dataset_init.is_child)

print('\ncalc:')
print('Is parent?', dataset_calc.is_parent)
print('Is child?', dataset_calc.is_child)
init:
Is parent? True
Is child? False

calc:
Is parent? True
Is child? True

You may append runs to any dataset within this chain, and a runner will be created in every dataset. When appending runs, you must include all arguments. Best practice here is to select a Dataset within the chain to append your runs to, and stick to it.

.. note::
Runners will be created with the same arguments in all datasets. This allows specification of args in more than just one dataset, however it does mean that you must be aware of this when defining functions.

Important

You can avoid “chaining” run_args through to the other datasets by passing chain_run_args=False to the run append. That way, any args such as mpi, omp, etc. will be passed only to the dataset to which the append was called.

[8]:
dataset_post.append_run(args={'x': 5, 'y': 2, 'offset': 10})
dataset_post.append_run(args={'x': 3, 'y': 7, 'offset': 0})
appended run runner-0
appended run runner-0
appended run runner-0
appended run runner-1
appended run runner-1
appended run runner-1

Now we have appended runs to the main dataset, we can see that runners have been added to both:

[9]:
print(f'initial dataset has {len(dataset_init.runners)} runners')
print(f'calculation dataset has {len(dataset_calc.runners)} runners')
print(f'postprocess dataset has {len(dataset_post.runners)} runners')
initial dataset has 2 runners
calculation dataset has 2 runners
postprocess dataset has 2 runners

Running

Datasets can be run as normal, submit from any in the chain to initiate:

[10]:
dataset_calc.run()
Staging Dependency
  [0] dataset-init-40221af0... Done, 2/2 Runners staged
  [1] dataset-calc-dce4a953... Done, 2/2 Runners staged
  [2] dataset-post-794d5d4f... Done, 2/2 Runners staged
Done
Transferring for 8 Runners
Transferring 15 Files... Done
Remotely executing 2 Runners
[10]:
True
[11]:
dataset_post.wait(1, timeout=10)
[12]:
dataset_post.fetch_results()
Fetching results
Transferring 4 Files... Done
[13]:
print(dataset_post.results)
['The final result is 20', 'The final result is 21']

Checking interim results

Here, our output looks as we expect, but what do we do if it doesn’t? Well we could first off start by investigating the calculation dataset, as in a real case, it’s likely to be the cause of the problems.

Internally, these datasets are no different to any other, so all their methods work exactly as you’d expect. The only limitations being the ones mentioned previously about appending runners and running parents.

[14]:
dataset_init.fetch_results()
print(dataset_init.results)
Fetching results
Transferring 4 Files... Done
[10, 0]
[15]:
print(dataset_init.states)
[RunnerState("satisfied"), RunnerState("satisfied")]

Note

A final note on branching jobs: At present, jobs having multiple children is supported in a limited sense. So your job tree can expand as you go down the chain, however the opposite is not true. You can not “merge” returned values into a single child.

Envionment Variables

You can set environment variables with the extra keyword arg in datasets (or runners). With dependencies, there’s some more info to note.

Any extra set at the Dataset level will be applied for that dataset.

So if you add extra="export KEY='VAR'" to a parent, it will be available for the children, but not in reverse.

[16]:
from remotemanager import RemoteFunction

@RemoteFunction
def search_env(nvars):
    import os

    found = {}
    for i in range(nvars):
        varname = f"var{i+1}"
        found[varname] = os.environ.get(varname, None)

    return found


def parent(nvars):
    return search_env(nvars)

def child(nvars):

    child_vars = search_env(nvars)

    output = {}
    for key in child_vars:
        output[key] = [loaded[key], child_vars[key]]

    return output

ds_1 = Dataset(parent, skip=False, extra="export var1='parentvar'")
ds_2 = Dataset(child, skip=False, extra="export var2='childvar'")

ds_1.set_downstream(ds_2)

ds_1.append_run({"nvars": 4}, extra="export var3='appendvar'")

ds_2.run(extra="export var4='runvar'")

ds_2.wait(1, 10)

ds_2.fetch_results()

ds_2.results
appended run runner-0
appended run runner-0
Staging Dependency
  [0] dataset-dataset-b0832d53... Done, 1/1 Runners staged
  [1] dataset-dataset-1eaa68f9... Done, 1/1 Runners staged
Done
Transferring for 2 Runners
Transferring 7 Files... Done
Remotely executing 1 Runner
Fetching results
Transferring 2 Files... Done
[16]:
[{'var1': ['parentvar', 'parentvar'],
  'var2': [None, 'childvar'],
  'var3': ['appendvar', 'appendvar'],
  'var4': ['runvar', 'runvar']}]

Lets analyse these results.

var1 was added at the level of the parent runner, so is exported in the first jobscript.

var2 was addeda at the level of the child runner, so is exported in the second jobscript.

Because of this, var1 is available in both the parent and child, whereas var2 is only available in the child.

var3 was added at the append level and is propagated to both appended runners (one for each dataset).

var4 is similar, since it’s set at the run level, it’s temporary, but added to both datasets.